kafka: add MATCHING broker rules for PrivateLink AZ mapping#36161
kafka: add MATCHING broker rules for PrivateLink AZ mapping#36161jubrad wants to merge 1 commit intoMaterializeInc:mainfrom
Conversation
02efa8f to
924a1ff
Compare
|
Note on On main, This PR moves it to a standalone function in This is intentional — one function handles all connection types rather than maintaining two parallel implementations. |
| /// If true, allow any combination of characters after the literal match. | ||
| pub suffix_wildcard: bool, |
There was a problem hiding this comment.
I'm wondering if we actually need a suffix, if this feature is restricted to confluent, confluent provides a suffix, so we only need prefix wildcard matching. I don't think this hurts, but it's not strictly necessary.
| SECURITY PROTOCOL PLAINTEXT | ||
| ); | ||
| contains:must set one of BROKER, BROKERS, or AWS PRIVATELINK | ||
| contains:must set one of BROKER, BROKERS, BOOTSTRAP BROKER, or AWS PRIVATELINK |
There was a problem hiding this comment.
I think this makes the syntax for this feature quite clear, but I'm very glad we got rid of the railroad track diagram because the syntax is getting wild.
- BROKER implies a single broker, it's exclusive with BROKERS, but both set bootstrap broker
- AWS PRIVATELINK only works when a single top level privatelink is provided with no individual broker matching, although perhaps one could provide BROKER
- BROKERS is the standard way for mapping brokers to aws privatelink for everything but red panda
I really don't know the best way to land things here.
- keep things as they are in this PR
- allow broker and brokers where broker is bootstrap and optional, but we error if not static broker is provided
- we get rid of bootstrap brokers and require at least one static broker if the user specifies BROKERS
There was a problem hiding this comment.
What's the "railroad track diagram"?
Re: We need at least one static broker to bootstrap from.
get rid of BOOTSTRAP BROKER and require at least one static broker in BROKERS
I like this one. It feels clearer than "if you only have MATCHING in BROKERS, you also need a BOOTSTRAP BROKER".
Then BROKER is subsumed by BROKERS and we can just about replace AWS PRIVATELINK with a static broker (static address = PrivateLink address) and a MATCHING '*' rule.
Not saying we remove the other ones, but at least we can focus on just BROKERS in docs, etc.
There was a problem hiding this comment.
What's the "railroad track diagram"?
We used to generate diagrams for all SQL using BNF that we had to maintain along all SQL changes these diagrams would outline the SQL sytnax which, in this case would be super complicated.
get rid of BOOTSTRAP BROKER and require at least one static broker in BROKERS
I like this one
Cool, I'll make this change
def-
left a comment
There was a problem hiding this comment.
mode cockroach
simple conn=mz_system,user=mz_system
ALTER SYSTEM SET max_aws_privatelink_connections = 10;
----
COMPLETE 0
statement ok
CREATE CONNECTION pl TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
statement error cannot specify BROKER, BOOTSTRAP BROKER, or static BROKERS entries alongside top-level AWS PRIVATELINK
CREATE CONNECTION kafka_bad TO KAFKA (
BROKER 'kafka:9092',
AWS PRIVATELINK pl (PORT 9092),
SECURITY PROTOCOL PLAINTEXT
);
Fails with a panic:
2026-04-22T02:23:43.120459Z ERROR mz_adapter::coord::sequencer::inner: connection validation panicked
Also this one:
mode cockroach
simple conn=mz_system,user=mz_system
ALTER SYSTEM SET max_aws_privatelink_connections = 10;
----
COMPLETE 0
simple conn=mz_system,user=mz_system
ALTER SYSTEM SET enable_kafka_broker_matching_rules = true;
----
COMPLETE 0
statement ok
CREATE CONNECTION pl TO AWS PRIVATELINK (
SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
AVAILABILITY ZONES ('use1-az1', 'use1-az4')
);
statement error BOOTSTRAP BROKER is required when using MATCHING broker rules
CREATE CONNECTION kafka_matching_no_bootstrap TO KAFKA (
BROKERS (
MATCHING '*.use1-az1.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az1'),
MATCHING '*.use1-az4.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az4')
),
SECURITY PROTOCOL PLAINTEXT
) WITH (VALIDATE = FALSE);
Fails too, we should reject CREATE CONNECTION without a BOOTSTRAP BROKER:
CREATE CONNECTION kafka_matching_no_bootstrap TO KAFKA (
BROKERS (
MATCHING '*.use1-az1.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az1'),
MATCHING '*.use1-az4.*' USING AWS PRIVATELINK pl (AVAILABILITY ZONE = 'use1-az4')
),
SECURITY PROTOCOL PLAINTEXT
) WITH (VALIDATE = FALSE);
UnexpectedPlanSuccess:test/sqllogictest/kafka_matching_requires_bootstrap_broker.
924a1ff to
a1fc2ad
Compare
98a1e05 to
a9e6f38
Compare
| scx, | ||
| self.ssh_tunnel, | ||
| self.aws_privatelink, | ||
| if matching_rules.is_empty() { None } else { Some(matching_rules) }, |
4b4f190 to
c9e6e0b
Compare
def-
left a comment
There was a problem hiding this comment.
Minor thing, we should reject invalid MATCHING strings:
diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl
index ea60378216..265a3608b2 100644
--- a/src/sql-parser/tests/testdata/ddl
+++ b/src/sql-parser/tests/testdata/ddl
@@ -671,6 +671,37 @@ error: Expected USING, found right parenthesis
CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*.az.*'))
^
+# Regression: MATCHING patterns must reject internal `*` wildcards. Only a
+# single leading and/or trailing `*` is supported; anything else silently
+# produces a rule that can never match a real broker hostname.
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*foo*bar*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '*foo*bar*' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+ ^
+
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'foo*bar' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'foo*bar' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+ ^
+
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '**az1**' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING '**az1**' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+ ^
+
+parse-statement
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'broker-*-az1-*.host.com' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+----
+error: pattern may only contain `*` as a leading and/or trailing wildcard
+CREATE CONNECTION kafka_connection TO KAFKA (BROKERS (MATCHING 'broker-*-az1-*.host.com' USING AWS PRIVATELINK privatelink_svc (AVAILABILITY ZONE 'use1-az1')))
+ ^
+
parse-statement
CREATE SOURCE mz_source FROM MYSQL CONNECTION mysqlconn FOR TABLES (foo, bar as qux, baz into zop);
----Runs with cargo test -p mz-sql-parser --test sqlparser_common datadriven
Introduces two new SQL constructs for Kafka PrivateLink connections:
- `BOOTSTRAP BROKER 'addr' USING AWS PRIVATELINK conn (...)` — provides
the initial bootstrap address with an explicit PrivateLink tunnel. The
bootstrap address is used as `bootstrap.servers` and the real hostname
is preserved for correct TLS SNI.
- `MATCHING 'pattern' USING AWS PRIVATELINK conn (...)` inside `BROKERS`
— pattern-based routing rules for dynamically discovered brokers.
After the initial metadata fetch, Kafka returns broker addresses that
may differ from the bootstrap address (e.g., AZ-specific hostnames).
MATCHING rules route these through the correct PrivateLink endpoint.
This replaces the `AWS PRIVATELINKS` syntax which used exact-match
patterns for dual-purpose bootstrap/routing and a separate `TO` keyword
inconsistent with the existing `USING AWS PRIVATELINK` syntax.
Example:
```sql
CREATE CONNECTION kafka TO KAFKA (
BOOTSTRAP BROKER 'lkc-825730.endpoint.cloud:9092'
USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'),
BROKERS (
MATCHING '*use1-az1*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'),
MATCHING '*use1-az4*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az4')
),
SASL MECHANISMS 'PLAIN',
SASL USERNAME 'key',
SASL PASSWORD SECRET secret,
SECURITY PROTOCOL 'SASL_SSL'
);
```
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
c9e6e0b to
3f22bb0
Compare
Follow up of #35455
Summary
BOOTSTRAP BROKER 'addr' USING AWS PRIVATELINK conn (...)— new top-level option that provides the initial bootstrap address with an explicit PrivateLink tunnel, preserving the real hostname for correct TLS SNIMATCHING 'pattern' USING AWS PRIVATELINK conn (...)insideBROKERS (...)— pattern-based routing rules for dynamically discovered brokers returned in Kafka metadataAWS PRIVATELINKSsyntax which used aTOkeyword inconsistent with the existingUSING AWS PRIVATELINKsyntax and overloaded exact-match patterns as implicit bootstrap addressesExample
CREATE CONNECTION kafka TO KAFKA ( BROKERS ( 'lkc-825730.endpoint.cloud:9092' USING AWS PRIVATELINK pl_conn, MATCHING '*use1-az1*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az1'), MATCHING '*use1-az4*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az4'), MATCHING '*use1-az6*' USING AWS PRIVATELINK pl_conn (AVAILABILITY ZONE 'use1-az6') ), SASL MECHANISMS 'PLAIN', SASL USERNAME 'key', SASL PASSWORD SECRET secret, SECURITY PROTOCOL 'SASL_SSL' );Test plan
cargo checkpassesDocs
🤖 Generated with Claude Code